package com.galeno.练习

import com.alibaba.fastjson.{JSON, JSONObject}
import com.galeno.utils.SparkUtil
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD

/**
 * @Title: ${file_name}
 * @Description: ${todo}
 * @author galeno
 * @date 2021/8/2717:05
 */
object 漏斗模型 {
  def main(args: Array[String]): Unit = {
    val sc: SparkContext = SparkUtil.getSc
    val log: RDD[String] = sc.textFile("F://applog/app_log_2021-06-08.log")
    //JSON.parseObject()

    val rdd1: RDD[(String, String, String, String)] = log.map(json => {
      val jSONObject = JSON.parseObject(json)
      val account: String = jSONObject.getString("account")
      val eventId: String = jSONObject.getString("eventId")
      val properties: String = jSONObject.getString("properties")
      val in = JSON.parseObject(properties)
      val keywords = in.getString("keywords")
      val productId: String = in.getString("productId")
      //println(account+"======"+eventId+"======="+keywords+"======"+productId)
      (account, eventId, keywords, productId)
    })
    val a = sc.accumulator(0)
    val b = sc.accumulator(0)
    val c = sc.accumulator(0)
    val part1 = rdd1.filter(x => {
      !x._1.equals("") && x._2.equals("search")&& x._3.equals("咖啡")
    }).count()


    val part2 = rdd1.filter(x => {
      !x._1.equals("") && x._2.equals("search") && x._3.equals("咖啡")
    }).filter(x => {
      x._2.equals("addCart") && x._3.startsWith("101")
    }).count()

    val part3 = rdd1.filter(x => {
      !x._1.equals("") && x._2.equals("search") && x._3.equals("咖啡")
    }).filter(x => {
      x._2.equals("addCart") && x._3.startsWith("101")
    }).filter(_._2.equals("submitOrder")).count()

    println(s"第一阶段${part1}=====第二阶段${part2}====第三阶段${part3}")

  }

}
